-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [{ADD|DROP|SYNC} PARTITIONS]
#31499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #134961 has finished for PR 31499 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #134981 has finished for PR 31499 at commit
|
MSCK REPAIR TABLE .. [ADD|DROP|SYNC] PARTITIONS
|
@cloud-fan @dongjoon-hyun @viirya @HyukjinKwon May I ask you to review this PR. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #134991 has finished for PR 31499 at commit
|
|
@cloud-fan @HyukjinKwon Any objections to the changes? |
|
@dongjoon-hyun @viirya Are you ok with the changes? |
|
Sorry for being late, @MaxGekk . I'll take a look at this during weekend. |
|
|
|
|
||
| **Syntax:** `[ database_name. ] table_name` | ||
|
|
||
| * **`[[ADD|DROP|SYNC] PARTITIONS]`** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the syntax and function aiming to be identical with HIVE-17824?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the syntax is from to the SQL standard but functionality is from Hive.
|
|
||
| * **`[[ADD|DROP|SYNC] PARTITIONS]`** | ||
|
|
||
| * If the option is not specified, `MSCK REPAIR TABLE` adds partitions to the Hive external catalog only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the option sounds a little mismatched because this is a SQL syntax. Are you mentioned the optional syntaxes like this in our SQL docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove the option, I found it in another command: http://spark.apache.org/docs/latest/sql-ref-syntax-aux-analyze-table.html
If no analyze option is specified, ANALYZE TABLE collects ...
but docs for other commands say If specified, ...
| * **`[[ADD|DROP|SYNC] PARTITIONS]`** | ||
|
|
||
| * If the option is not specified, `MSCK REPAIR TABLE` adds partitions to the Hive external catalog only. | ||
| * **ADD**, the command adds new partitions in the catalog for all sub-folder in the base table folder that don't belong to any table partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like line 42 (before this line) and line 44 (after this line), the catalog -> the Hive external catalog consistently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the command calls the session catalog which is an internal catalog servers as a proxy to an external catalog (In-Memory or Hive). Strictly speaking, the command adds/drops partitions in the session catalog. Let me update the doc to be more precise.
| | TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable | ||
| | MSCK REPAIR TABLE multipartIdentifier #repairTable | ||
| | MSCK REPAIR TABLE multipartIdentifier | ||
| (option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
@dongjoon-hyun Thank you for reviewing this PR. Regarding to syntax for alternatives, I wasn't sure what should I use in docs. I just looked at existing examples:
In docs (link): In SqlBase.g4: spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 Line 136 in ba974ea
In docs (link) In SqlBase.g4: spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 Line 198 in ba974ea
You proposed |
+1 for your suggestion (2). |
|
@dongjoon-hyun I have looked at the SQL standard, it uses both notions: { UTF8 | UTF16 | UTF32}
GENERATED { ALWAYS | BY DEFAULT } AS IDENTITYand [ INSTANCE | STATIC | CONSTRUCTOR ] METHODIt seems I do believe we should use the syntax |
MSCK REPAIR TABLE .. [ADD|DROP|SYNC] PARTITIONSMSCK REPAIR TABLE .. [{ ADD | DROP |SYNC } PARTITIONS]
MSCK REPAIR TABLE .. [{ ADD | DROP |SYNC } PARTITIONS]MSCK REPAIR TABLE .. [{ADD|DROP|SYNC} PARTITIONS]
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #135175 has finished for PR 31499 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
Show resolved
Hide resolved
|
@dongjoon-hyun Could you take a look at this PR one more time, please. |
|
Oops. Sorry, @MaxGekk . Could you rebase to the master once more please? |
| dropPartSpecs, | ||
| ignoreIfNotExists = true, | ||
| purge = false, | ||
| retainData = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment about the reason why we use retainData=true? I guess the reason is that fs.exists(..) is already false and we don't want addition file system calls. Did I understand correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... we don't want addition file system calls. Did I understand correctly?
Yep, if we set retainData to true, the deleteData flag will false at https://github.com/apache/hive/blob/release-3.1.3-rc0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java#L4360-L4378 . So, Hive MeteStore will not try to delete the partition folders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same for the In-Memory catalog:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
Lines 464 to 473 in bfc0235
| if (existingParts.contains(p) && shouldRemovePartitionLocation) { | |
| val partitionPath = new Path(existingParts(p).location) | |
| try { | |
| val fs = partitionPath.getFileSystem(hadoopConfig) | |
| fs.delete(partitionPath, true) | |
| } catch { | |
| case e: IOException => | |
| throw QueryExecutionErrors.unableToDeletePartitionPathError(partitionPath, e) | |
| } | |
| } |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM with one minor comment about adding comment.
Let's rebase and see the CI result.
…artitions # Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #135374 has finished for PR 31499 at commit
|
|
Thank you for updating, @MaxGekk ! |
|
Merged to master for Apache Spark 3.2.0. |
|
|
||
| * **`{ADD|DROP|SYNC} PARTITIONS`** | ||
|
|
||
| * If specified, `MSCK REPAIR TABLE` only adds partitions to the session catalog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: should be If not specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to put it in the end, and say If not specified, ADD is the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the PR #31633
| checkPartitions(t, Map("part" -> "1"), Map("part" -> "2")) | ||
| checkAnswer(sql(s"SELECT col, part FROM $t"), Seq(Row(1, 1), Row(0, 2))) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we test ADD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have many tests for ADD in AlterTableRecoverPartitionsSuite since ALTER TABLE .. RECOVER PARTITIONS is equal to MSCK REPAIR TABLE .. ADD PARTITIONS semantically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan If you would like to test MSCK REPAIR TABLE .. ADD PARTITIONS explicitly, we could mix the v1.AlterTableRecoverPartitionsSuiteBase to MsckRepairTableSuiteBase to run the existing tests automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to
|
late LGTM |
…pairTableCommand`
### What changes were proposed in this pull request?
Rename the execution node `AlterTableRecoverPartitionsCommand` for the commands:
- `MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]`
- `ALTER TABLE table RECOVER PARTITIONS`
to `RepairTableCommand`.
### Why are the changes needed?
1. After the PR #31499, `ALTER TABLE table RECOVER PARTITIONS` is equal to `MSCK REPAIR TABLE table ADD PARTITIONS`. And mapping of the generic command `MSCK REPAIR TABLE` to the more specific execution node `AlterTableRecoverPartitionsCommand` can confuse devs in the future.
2. `ALTER TABLE table RECOVER PARTITIONS` does not support any options/extensions. So, additional parameters `enableAddPartitions` and `enableDropPartitions` in `AlterTableRecoverPartitionsCommand` confuse as well.
### Does this PR introduce _any_ user-facing change?
No because this is internal API.
### How was this patch tested?
By running the existing test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite"
$ build/sbt "test:testOnly *AlterTableRecoverPartitionsParserSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite"
$ build/sbt "test:testOnly *MsckRepairTableParserSuite"
```
Closes #31635 from MaxGekk/rename-recover-partitions.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…TITIONS]`
### What changes were proposed in this pull request?
In the PR, I propose to extend the `MSCK REPAIR TABLE` command, and support new options `{ADD|DROP|SYNC} PARTITIONS`. In particular:
1. Extend the logical node `RepairTable`, and add two new flags `enableAddPartitions` and `enableDropPartitions`.
2. Add similar flags to the v1 execution node `AlterTableRecoverPartitionsCommand`
3. Add new method `dropPartitions()` to `AlterTableRecoverPartitionsCommand` which drops partitions from the catalog if their locations in the file system don't exist.
4. Updated public docs about the `MSCK REPAIR TABLE` command:
<img width="1037" alt="Screenshot 2021-02-16 at 13 46 39" src="https://user-images.githubusercontent.com/1580697/108052607-7446d280-705d-11eb-8e25-7398254787a4.png">
Closes apache#31097
### Why are the changes needed?
- The changes allow to recover tables with removed partitions. The example below portraits the problem:
```sql
spark-sql> create table tbl2 (col int, part int) partitioned by (part);
spark-sql> insert into tbl2 partition (part=1) select 1;
spark-sql> insert into tbl2 partition (part=0) select 0;
spark-sql> show table extended like 'tbl2' partition (part = 0);
default tbl2 false Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0
...
```
Remove the partition (part = 0) from the filesystem:
```
$ rm -rf /Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0
```
Even after recovering, we cannot query the table:
```sql
spark-sql> msck repair table tbl2;
spark-sql> select * from tbl2;
21/01/08 22:49:13 ERROR SparkSQLDriver: Failed in [select * from tbl2]
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0
```
- To have feature parity with Hive: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)
### Does this PR introduce _any_ user-facing change?
Yes. After the changes, we can query recovered table:
```sql
spark-sql> msck repair table tbl2 sync partitions;
spark-sql> select * from tbl2;
1 1
spark-sql> show partitions tbl2;
part=1
```
### How was this patch tested?
- By running the modified test suite:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableParserSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *PlanResolutionSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsParallelSuite"
```
- Added unified v1 and v2 tests for `MSCK REPAIR TABLE`:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite"
```
Closes apache#31499 from MaxGekk/repair-table-drop-partitions.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
In the PR, I propose to extend the
MSCK REPAIR TABLEcommand, and support new options{ADD|DROP|SYNC} PARTITIONS. In particular:RepairTable, and add two new flagsenableAddPartitionsandenableDropPartitions.AlterTableRecoverPartitionsCommanddropPartitions()toAlterTableRecoverPartitionsCommandwhich drops partitions from the catalog if their locations in the file system don't exist.MSCK REPAIR TABLEcommand:Closes #31097
Why are the changes needed?
Remove the partition (part = 0) from the filesystem:
Even after recovering, we cannot query the table:
Does this PR introduce any user-facing change?
Yes. After the changes, we can query recovered table:
How was this patch tested?
MSCK REPAIR TABLE: